{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Blockwise Ensemble Methods\n",
    "\n",
    "Dask-ML provides some [ensemble methods](https://ml.dask.org/modules/api.html#module-dask_ml.ensemble) that are tailored to `dask.array`'s and `dask.dataframe`'s blocked structure. The basic idea is to fit a copy of some sub-estimator to each block (or partition) of the dask Array or DataFrame. Becuase each block fits in memory, the sub-estimator only needs to handle in-memory data structures like a NumPy array or pandas DataFrame. It also will be relatively fast, since each block fits in memory and we won't need to move large amounts of data between workers on a cluster. We end up with an ensemble of models: one per block in the training dataset.\n",
    "\n",
    "At prediction time, we combine the results from all the models in the ensemble. For regression problems, this means averaging the predictions from each sub-estimator. For classification problems, each sub-estimator votes and the results are combined. See https://scikit-learn.org/stable/modules/ensemble.html#voting-classifier for details on how they can be combeind. See https://scikit-learn.org/stable/modules/ensemble.html for a general overview of why averaging ensemble methods can be useful.\n",
    "\n",
    "It's crucially important that the distribution of values in your dataset be relatively uniform across partitions. Otherwise the parameters learned on any given partition of the data will be poor for the dataset as a whole. This will be shown in detail later."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's randomly generate an example dataset. In practice, you would load the data from storage. We'll create a `dask.array` with 10 blocks."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from distributed import Client\n",
    "import dask_ml.datasets\n",
    "import dask_ml.ensemble\n",
    "\n",
    "client = Client(n_workers=4, threads_per_worker=1)\n",
    "\n",
    "X, y = dask_ml.datasets.make_classification(n_samples=1_000_000,\n",
    "                                            n_informative=10,\n",
    "                                            shift=2, scale=2,\n",
    "                                            chunks=100_000)\n",
    "X"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Classification\n",
    "\n",
    "The `sub-estimator` should be an instantiated scikit-learn-API compatible estimator (anything that implements the `fit` / `predict` API, including pipelines). It only needs to handle in-memory datasets. We'll use `sklearn.linear_model.RidgeClassifier`.\n",
    "\n",
    "To get the output shapes right, we require that you provide the `classes` for classification problems, either when creating the estimator or in `.fit` if the sub-estimator also requires the classes."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import sklearn.linear_model\n",
    "\n",
    "subestimator = sklearn.linear_model.RidgeClassifier(random_state=0)\n",
    "clf = dask_ml.ensemble.BlockwiseVotingClassifier(\n",
    "    subestimator,\n",
    "    classes=[0, 1]\n",
    ")\n",
    "clf"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can train normally. This will *independently* fit a clone of `subestimator` on each partition of `X` and `y`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "clf.fit(X, y)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "All of the fitted estimators are available at `.estimators_`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "clf.estimators_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "These are different estimators! They've been trained on separate batches of data and have learned different parameters. We can plot the difference in the learned `coef_` of the first two models to visualize this."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import matplotlib.pyplot as plt\n",
    "import numpy as np"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "a = clf.estimators_[0].coef_\n",
    "b = clf.estimators_[1].coef_\n",
    "\n",
    "fig, ax = plt.subplots()\n",
    "ax.bar(np.arange(a.shape[1]), (a - b).ravel())\n",
    "ax.set(xticks=[], xlabel=\"Feature\", title=\"Difference in Learned Coefficients\");"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "That said, the assumption backing this entire process is that the distribution of the data is relatively uniform across partitions. The parameters learned by the each member of the ensemble should be relatively similar, and so will give relatively similar predictions when applied to the same data.\n",
    "\n",
    "When you `predict`, the result will have the same chunking pattern as the input array you're predicting for (which need not match the partitioning of the training data)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "preds = clf.predict(X)\n",
    "preds"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This generates a set of tasks that\n",
    "\n",
    "1. Calls `subestimator.predict(chunk)` for each subestimator (10 in our case)\n",
    "2. Concatenates those predictions together\n",
    "3. Somehow averages the predictions to a single overall prediction\n",
    "\n",
    "We used the default `voting=\"hard\"` strategy, which means we just choose the class that had the higest number of votes. If the first two sub-estimators picked class `0` and the other eight picked class `1` for the first row, the final prediction for that row will be class `1`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "preds[:10].compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "With `voting=\"soft\"` we have access to `predict_proba`, as long as the subestimator has a `predict_proba` method. These subestimators should be well-calibrated for the predictions to be meaningful. See [probability calibration](https://scikit-learn.org/stable/modules/calibration.html#calibration) for more."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "subestimator = sklearn.linear_model.LogisticRegression(random_state=0)\n",
    "clf = dask_ml.ensemble.BlockwiseVotingClassifier(\n",
    "    subestimator,\n",
    "    classes=[0, 1],\n",
    "    voting=\"soft\"\n",
    ")\n",
    "clf.fit(X, y)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "proba = clf.predict_proba(X)\n",
    "proba[:5].compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The stages here are similar to the `voting=\"hard\"` case. Only now instead of taking the majority vote we average the probabilities predicted by each sub-estimator."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Regression\n",
    "\n",
    "Regression is quite similar. The primary difference is that there's no voting; predictions from estimators are always reduced by averaging."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "X, y = dask_ml.datasets.make_regression(n_samples=1_000_000,\n",
    "                                        chunks=100_000,\n",
    "                                        n_features=20)\n",
    "X"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "subestimator = sklearn.linear_model.LinearRegression()\n",
    "clf = dask_ml.ensemble.BlockwiseVotingRegressor(\n",
    "    subestimator,\n",
    ")\n",
    "clf.fit(X, y)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "clf.predict(X)[:5].compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "As usual with Dask-ML, scoring is done in parallel (and distributed on a cluster if you're connected to one)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "clf.score(X, y)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## The dangers of non-uniformly distributed data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Finally, it must be re-emphasized that your data should be uniformly distributed across partitoins prior to using these ensemble methods. If it's not, then you're better off just sampling rows from each partition and fitting a single classifer to it. By \"uniform\" we don't mean \"from a uniform probabillity distribution\". Just that there shouldn't be a clear per-partition pattern to how the data is distributed.\n",
    "\n",
    "Let's demonstrate that with an example. We'll generate a dataset with a clear trend across partitions. This might represent some non-stationary time-series, though it can occur in other contexts as well (e.g. on data partitioned by geography, age, etc.)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import dask.array as da\n",
    "import dask.delayed\n",
    "import sklearn.datasets"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def clone_and_shift(X, y, i):\n",
    "    X = X.copy()\n",
    "    X += i + np.random.random(X.shape)\n",
    "    y += 25 * (i + np.random.random(y.shape))\n",
    "    return X, y"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Make a base dataset that we'll clone and shift\n",
    "X, y = sklearn.datasets.make_regression(n_features=4, bias=2, random_state=0)\n",
    "\n",
    "# Clone and shift 10 times, gradually increasing X and y for each partition\n",
    "Xs, ys = zip(*[dask.delayed(clone_and_shift, nout=2)(X, y, i) for i in range(10)])\n",
    "Xs = [da.from_delayed(x, shape=X.shape, dtype=X.dtype) for x in Xs]\n",
    "ys = [da.from_delayed(y_, shape=y.shape, dtype=y.dtype) for y_ in ys]\n",
    "X2 = da.concatenate(Xs)\n",
    "y2 = da.concatenate(ys)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's plot a sample of points, coloring by which partition the data came from."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "fig, ax = plt.subplots()\n",
    "ax.scatter(X2[::5, 0], y2[::5], c=np.arange(0, len(X2), 5) // 100, cmap=\"Set1\",\n",
    "           label=\"Partition\")\n",
    "ax.set(xlabel=\"Feature 0\", ylabel=\"target\", title=\"Non-stationary data (by partition)\");"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now let's fit two estimators:\n",
    "\n",
    "1. One `BlockwiseVotingRegressor` on the entire dataset (which fits a `LinearRegression` on each partition)\n",
    "2. One `LinearRegression` on a sample from the entire dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "subestimator = sklearn.linear_model.LinearRegression()\n",
    "clf = dask_ml.ensemble.BlockwiseVotingRegressor(\n",
    "    subestimator,\n",
    ")\n",
    "clf.fit(X2, y2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "X_sampled, y_sampled = dask.compute(X2[::10], y2[::10])\n",
    "\n",
    "subestimator.fit(X_sampled, y_sampled)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Comparing the scores, we find that the sampled dataset performs much better, despite training on less data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "clf.score(X2, y2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "subestimator.score(X2, y2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This shows that ensuring your needs to be relatively uniform across partitions. Even including the standard controls to normalize whatever underlying force is generating the non-stationary data (e.g. a time trend compontent or differencing timeseries data, dummy variables for geographic regions, etc) is not sufficient when your dataset is partioned by the non-uniform variable. You would still need to either shuffle your data prior to fitting, or just sample and fit the sub-estimator on the sub-sample that fits in memory."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
